package drds.plus.repository.mysql.spi;

import drds.plus.common.jdbc.SetParameterMethodAndArgs;
import drds.plus.datanode.api.Connection;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.datanode.api.PreparedStatement;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.ExecutorException;
import drds.plus.executor.cursor.cursor.IAffectRowCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.common.AffectRowCursor;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.ResultSetRowValues;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.table.ITable;
import drds.plus.executor.transaction.Transaction;
import drds.plus.executor.transaction.strict_write_with_non_transaction_cross_database_read.ReadWrite;
import drds.plus.repository.mysql.common.ResultSetAutoCloseConnection;
import drds.plus.repository.mysql.common.ResultSetWrapper;
import drds.plus.repository.mysql.cursor.ResultSetCursor;
import drds.plus.repository.mysql.execute_plan_to_sql.ExecutePlanVisitor;
import drds.plus.repository.mysql.execute_plan_to_sql.SqlAndParameter;
import drds.plus.sql_process.abstract_syntax_tree.configuration.IndexMapping;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.dml.IPut;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.dml.PutType;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query;
import drds.tools.$;
import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
 * api 方法执行相关的数据封装. 每个需要执行的cursor都可以持有这个对象进行数据库操作。 preparedStatement ..
 * fuxk java ，没有多继承。。只能用组合。。你懂的。。 类不是线程安全的哦亲
 */
@Slf4j
public class JdbcHandler implements IJdbcHandler {

    protected Transaction transaction = null;
    protected Connection connection = null;
    protected ResultSet resultSet = null;
    protected PreparedStatement preparedStatement = null;
    protected ExecutionType executionType = null;
    protected RowValues currentRowData = null;
    protected RowValues prevRowData = null;
    protected CursorMetaData cursorMetaData;
    protected boolean isStreaming = false;
    protected String dataNodeId = null;
    protected DatasourceManager datasourceManager = null;
    protected ExecuteContext executeContext = null;
    protected boolean initPrev = false;
    protected ExecutePlan executePlan;

    public JdbcHandler(ExecuteContext executeContext) {
        this.executeContext = executeContext;
    }

    public void executeQuery(CursorMetaData cursorMetaData, boolean isStreaming) throws SQLException {
        setCursorMetaData(cursorMetaData, isStreaming);
        SqlAndParameter sqlAndParameter = null;
        if (executeContext.getParameters() != null) {

            // 查询语句不支持batch模式
            if (executeContext.getParameters().isBatch()) {
                throw new ExecutorException("batch not supported executePlan chars");
            }
        }
        sqlAndParameter = new SqlAndParameter();
        if (executePlan.getSql() != null) {
            sqlAndParameter.sql = executePlan.getSql();
            if (executeContext.getParameters() != null) {
                sqlAndParameter.indexToSetParameterMethodAndArgsMap = executeContext.getParameters().getIndexToSetParameterMethodAndArgsMap();
            } else {
                sqlAndParameter.indexToSetParameterMethodAndArgsMap = new HashMap<Integer, SetParameterMethodAndArgs>();
            }
        } else {
            this.cursorMetaData.setIsSureLogicalIndexEqualActualIndex(true);
            if (executePlan instanceof Query) {
                ((Query) executePlan).setTopQuery(true);
                ExecutePlanVisitor executePlanVisitor = new ExecutePlanVisitor(executePlan, null, null, null, null, true);
                executePlan.accept(executePlanVisitor);
                sqlAndParameter.sql = executePlanVisitor.getString();
                sqlAndParameter.indexToSetParameterMethodAndArgsMap = executePlanVisitor.getOutPutIndexToSetParameterMethodAndArgsMap();
            }
        }

        try {

            executionType = ExecutionType.get;
            connection = transaction.getConnection(dataNodeId, datasourceManager, ReadWrite.read);

            preparedStatement = prepareStatement(sqlAndParameter.sql, connection, executeContext, false);
            if (isStreaming) {
                // 当prev的时候 不能设置
                preparedStatement.setFetchSize(Integer.MIN_VALUE);
            }

            Map<Integer, SetParameterMethodAndArgs> indexToSetParameterMethodAndArgsMap = sqlAndParameter.indexToSetParameterMethodAndArgsMap;
            SetParameterMethodAndArgs.setParameters((java.sql.PreparedStatement) preparedStatement.statementWrapper, indexToSetParameterMethodAndArgsMap);

            ResultSet resultSet = new ResultSetWrapper(preparedStatement.executeQuery().resultSet, this);

            this.resultSet = resultSet;
        } catch (Throwable e) {
            try {
                // 关闭自提交的链接
                close();
            } finally {
                if (e.getMessage().contains("SqlType is Not Support")) {
                    // 返回一个空结果
                    preparedStatement = connection.prepareDataNodePreparedStatement("selectStatement 1");
                    this.resultSet = new ResultSetWrapper(new ResultSetAutoCloseConnection(preparedStatement.executeQuery().resultSet, connection, preparedStatement), this);
                } else {
                    throw new ExecutorException(e);
                }
            }
        } finally {

        }
    }

    public IAffectRowCursor executeUpdate(ExecuteContext executeContext, IPut put, ITable table, IndexMapping indexMapping) throws SQLException {
        SqlAndParameter sqlAndParameter = new SqlAndParameter();
        if (put.getSql() != null) {
            sqlAndParameter.sql = put.getSql();
            if (executeContext.getParameters() != null) {
                sqlAndParameter.indexToSetParameterMethodAndArgsMap = executeContext.getParameters().getIndexToSetParameterMethodAndArgsMap();
            } else {
                sqlAndParameter.indexToSetParameterMethodAndArgsMap = new HashMap<Integer, SetParameterMethodAndArgs>();
            }
        } else {
            ExecutePlanVisitor executePlanVisitor = new ExecutePlanVisitor(executePlan, executeContext.getParameters() != null ? executeContext.getParameters().getFirstIndexToSetParameterMethodAndArgsMap() : null, null, null, null, true);
            put.accept(executePlanVisitor);

            sqlAndParameter.sql = executePlanVisitor.getString();
            sqlAndParameter.indexToSetParameterMethodAndArgsMap = executePlanVisitor.getOutPutIndexToSetParameterMethodAndArgsMap();
            sqlAndParameter.newIndexToOldIndexMap = executePlanVisitor.getNewParamIndexToOldMap();
        }

        boolean isInsert = false;
        if (PutType.insert.equals(put.getPutType())) {
            isInsert = true;
        }

        boolean isBatch = executeContext.getParameters() != null && executeContext.getParameters().isBatch();
        try {
            // 可能执行过程有失败，需要释放链接
            connection = transaction.getConnection(dataNodeId, datasourceManager, ReadWrite.write);
            preparedStatement = prepareStatement(sqlAndParameter.sql, connection, executeContext, isInsert);
            int affectRows = 0;
            if (isBatch) {
                for (Object o : put.getBatchIndexList()) {
                    Integer batchIndex = (Integer) o;

                    Map<Integer, SetParameterMethodAndArgs> indexToSetParameterMethodAndArgsMapList = executeContext.getParameters().getIndexToSetParameterMethodAndArgsMapList().get(batchIndex);

                    if (sqlAndParameter.newIndexToOldIndexMap != null) {
                        for (Entry<Integer, Integer> newIndexToOldIndexMap : sqlAndParameter.newIndexToOldIndexMap.entrySet()) {
                            sqlAndParameter.indexToSetParameterMethodAndArgsMap.get(newIndexToOldIndexMap.getKey()).setValue(indexToSetParameterMethodAndArgsMapList.get(newIndexToOldIndexMap.getValue()).getValue());
                        }
                    } else {
                        sqlAndParameter.indexToSetParameterMethodAndArgsMap = indexToSetParameterMethodAndArgsMapList; // 使用hint+batch时,绑定变量下标不会做变化
                    }
                    SetParameterMethodAndArgs.setParameters((java.sql.PreparedStatement) preparedStatement.statementWrapper, sqlAndParameter.indexToSetParameterMethodAndArgsMap);
                    preparedStatement.addBatch();
                }

                int[] nn = preparedStatement.executeBatch();

                for (int n : nn) {
                    affectRows += n;
                }
            } else {
                SetParameterMethodAndArgs.setParameters((java.sql.PreparedStatement) preparedStatement.statementWrapper, sqlAndParameter.indexToSetParameterMethodAndArgsMap);
                if (log.isDebugEnabled()) {
                    log.debug("sqlAndParameter:\n" + sqlAndParameter);
                }

                affectRows = preparedStatement.executeUpdate();
            }
            UpdateResultWrapper updateResultWrapper = new UpdateResultWrapper(affectRows, this);
            executionType = ExecutionType.put;
            this.resultSet = updateResultWrapper;

            int i = resultSet.getInt(UpdateResultWrapper.AFFECT_ROW);
            IAffectRowCursor affectRowCursor = new AffectRowCursor(i);
            if (isInsert) {
                ResultSet lastInsertIdResult = null;
                try {
                    lastInsertIdResult = preparedStatement.getGeneratedKeys().resultSet;
                    List<Long> generatedKeys = new ArrayList<Long>();
                    while (lastInsertIdResult.next()) {
                        long id = lastInsertIdResult.getLong(1);
                        if (id != 0) {
                            executeContext.getSession().setLastInsertId(id);
                        }

                        generatedKeys.add(id);
                    }
                    executeContext.getSession().setGeneratedKeys(generatedKeys);
                } finally {
                    if (lastInsertIdResult != null) {
                        lastInsertIdResult.close();
                    }
                }
            }
            return affectRowCursor;
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally {

            close();
        }
    }


    public ISortingCursor getResultCursor() {

        if (executionType == ExecutionType.put) {
            throw new IllegalAccessError("impossible");
        } else if (executionType == ExecutionType.get) {
            // get的时候只会有一个结果集
            ResultSet resultSet = this.resultSet;
            return new ResultSetCursor(resultSet, this);
        } else {
            return null;
        }

    }

    protected PreparedStatement getPreparedStatement() {
        return preparedStatement;
    }


    protected void setCursorMetaData(CursorMetaData cursorMetaData, boolean isStreaming) {
        if (this.cursorMetaData == null) {
            this.cursorMetaData = cursorMetaData;
        }

        if (isStreaming != this.isStreaming) {
            this.isStreaming = isStreaming;
        }
    }

    public void setDatasourceManager(DatasourceManager datasourceManager) {
        this.datasourceManager = datasourceManager;
    }

    protected boolean closeStreaming(PreparedStatement dataNodePreparedStatement, ResultSet resultSet) throws SQLException {
        dataNodePreparedStatement.cancel();
        return false;
    }

    public void close() throws SQLException {
        boolean hasRealClose = false;
        try {
            /*
             * 非流计算的时候，用普通的close方法，而如果是streaming的情况下 将按以下模式关闭：
             * http://jira.taobao.ali.com/browse/ANDOR-149
             * http://gitlab.alibaba-inc.com/andor/issues/1835
             */
            if (!isStreaming) {
                if (resultSet != null) {
                    resultSet.close();
                    resultSet = null;
                }
            } else {
                try {
                    if (resultSet != null) {
                        boolean hasNext = true;
                        try {
                            hasNext = resultSet.next();
                        } catch (SQLException e) { // 可能已经关闭了，直接忽略
                            hasNext = false;
                        }

                        if (hasNext) { // 尝试获取一下是否有后续记录，如果数据已经取完了，那就直接关闭
                            hasRealClose = closeStreaming(this.preparedStatement, resultSet);
                        } else {
                            resultSet.close();
                            resultSet = null;
                        }
                    }
                } catch (Throwable e) {
                    if (resultSet != null) {
                        resultSet.close();
                        resultSet = null;
                    }

                    throw new RuntimeException(e);
                }
            }
        } finally {
            if (!hasRealClose && preparedStatement != null) {
                preparedStatement.setFetchSize(0);
                preparedStatement.close();
                preparedStatement = null;
            } else {
                preparedStatement = null;
            }
        }

        if (this.connection != null) {
            this.executeContext.getTransaction().tryClose(this.dataNodeId, this.connection);
        }

        executionType = null;
    }

    public boolean skipTo(Record key, CursorMetaData cursorMetaData) throws SQLException {
        checkInitedInRsNext();
        throw new RuntimeException("暂时不支持skip to");
    }

    public RowValues next() throws SQLException {
        if (datasourceManager == null) {
            throw new IllegalArgumentException("datasourceManager is null");
        }

        checkInitedInRsNext();
        prevRowData = currentRowData;
        try {
            if (resultSet == null) {
                return null;
            }
        } catch (Exception ex) {
            return null;
        }
        if (resultSet.next()) {
            currentRowData = new ResultSetRowValues(cursorMetaData, resultSet);

        } else {
            currentRowData = null;
        }

        return currentRowData;
    }

    protected PreparedStatement prepareStatement(String sql, Connection connection, ExecuteContext executeContext, boolean isInsert) throws SQLException {
        if (this.preparedStatement != null) {
            throw new IllegalStateException("上一个请求还未执行完毕");
        }
        if (connection == null) {
            throw new IllegalStateException("should not be here");
        }

        StringBuilder sb = new StringBuilder();

        if (executeContext.getGroupHint() != null) {
            // 如果有group directlyRouteCondition，传递一下hint
            sb.append(executeContext.getGroupHint());
        }

        String ap = sb.toString();
        if ($.isNotNullAndNotEmpty(ap)) {
            sql = ap + sql;
        }

        int autoGeneratedKeys = executeContext.getAutoGeneratedKeys();
        if (isInsert) {
            autoGeneratedKeys = Statement.RETURN_GENERATED_KEYS;
        }


        PreparedStatement dataNodePreparedStatement = null;
        if (autoGeneratedKeys != -1) {
            dataNodePreparedStatement = connection.prepareDataNodePreparedStatement(sql, autoGeneratedKeys);
        } else {
            dataNodePreparedStatement = connection.prepareDataNodePreparedStatement(sql);
        }

        this.preparedStatement = dataNodePreparedStatement;
        return dataNodePreparedStatement;
    }

    public RowValues first() throws SQLException {
        resultSet.beforeFirst();
        resultSet.next();
        currentRowData = new ResultSetRowValues(cursorMetaData, resultSet);
        return currentRowData;

    }

    public RowValues last() throws SQLException {
        resultSet.afterLast();
        resultSet.previous();
        currentRowData = new ResultSetRowValues(cursorMetaData, resultSet);
        return currentRowData;

    }

    public RowValues getCurrentRowData() {
        return currentRowData;
    }

    protected void checkInitedInRsNext() {
        if (!isInited()) {
            throw new IllegalArgumentException("not inited");
        }
    }

    public boolean isInited() {
        return resultSet != null;
    }

    public void beforeFirst() throws SQLException {
        this.close();
        this.executeQuery(cursorMetaData, isStreaming);
        currentRowData = null;
    }

    public RowValues prev() throws SQLException {
        if (datasourceManager == null) {
            throw new IllegalArgumentException("datasourceManager is null");
        }
        if (!initPrev) {
            initPrev = true;
            return convertRowSet(resultSet.last());
        }

        checkInitedInRsNext();
        return convertRowSet(resultSet.previous());

    }

    protected RowValues convertRowSet(boolean isOk) throws SQLException {
        prevRowData = currentRowData;
        if (isOk) {
            currentRowData = new ResultSetRowValues(cursorMetaData, resultSet);
        } else {
            currentRowData = null;
        }

        return currentRowData;
    }

    public boolean isDone() {
        return true;
    }

    public void cancel(boolean interruptedIfRunning) {
    }

    public boolean isCanceled() {
        return false;
    }

    public Transaction getTransaction() {
        return transaction;
    }

    public void setTransaction(Transaction transaction) {
        this.transaction = transaction;
    }

    public String getDataNodeId() {
        return dataNodeId;
    }

    public void setDataNodeId(String dataNodeId) {
        this.dataNodeId = dataNodeId;
    }

    public Boolean getIsStreaming() {
        return isStreaming;
    }

    public void setIsStreaming(Boolean isStreaming) {
        this.isStreaming = isStreaming;
    }

    public ResultSet getResultSet() {
        return this.resultSet;
    }

    public ExecutePlan getExecutePlan() {
        return this.executePlan;
    }

    public void setExecutePlan(ExecutePlan executePlan) {
        this.executePlan = executePlan;
    }

    public ExecuteContext getExecuteContext() {
        return executeContext;
    }

    public void setExecuteContext(ExecuteContext executeContext) {
        this.executeContext = executeContext;
    }

}
